/**************************************************************************************
 *  Copyright (c) 2019- Gabriele Mencagli
 *  
 *  This file is part of WindFlow.
 *  
 *  WindFlow is free software dual licensed under the GNU LGPL or MIT License.
 *  You can redistribute it and/or modify it under the terms of the
 *    * GNU Lesser General Public License as published by
 *      the Free Software Foundation, either version 3 of the License, or
 *      (at your option) any later version
 *    OR
 *    * MIT License: https://github.com/ParaGroup/WindFlow/blob/master/LICENSE.MIT
 *  
 *  WindFlow is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU Lesser General Public License for more details.
 *  You should have received a copy of the GNU Lesser General Public License and
 *  the MIT License along with WindFlow. If not, see <http://www.gnu.org/licenses/>
 *  and <http://opensource.org/licenses/MIT/>.
 **************************************************************************************
 */

/** 
 *  @file    source_shipper.hpp
 *  @author  Gabriele Mencagli
 *  
 *  @brief Source_Shipper class used to send outputs generated by the Source operator
 *  
 *  @section Source_Shipper (Description)
 *  
 *  This file implements the Source_Shipper class used to send outputs generated by
 *  the Source operator.
 */ 

#ifndef SOURCE_SHIPPER_H
#define SOURCE_SHIPPER_H

/// includes
#include<ff/multinode.hpp>
#include<basic.hpp>
#include<single_t.hpp>
#if defined (WF_TRACING_ENABLED)
    #include<stats_record.hpp>
#endif
#include<basic_emitter.hpp>

namespace wf {

/** 
 *  \class Source_Shipper
 *  
 *  \brief Source_Shipper class used to send generated outputs by the Source operator
 *  
 *  This class implements the Source_Shipper class used to send outputs generated by
 *  the Source operator.
 */ 
template<typename result_t>
class Source_Shipper
{
private:
    template<typename T1> friend class Source_Replica;
    template<typename T1> friend class KafkaSource_Replica;
    Basic_Emitter *emitter; // pointer to the emitter used for the delivery of messages
    ff::ff_monode *node; // pointer to the fastflow node to be passed to the emitter
    Execution_Mode_t execution_mode; // execution mode of the PipeGraph
    Time_Policy_t time_policy; // time mode of the PipeGraph
    uint64_t num_delivered; // counter of the delivered results
    uint64_t max_timestamp; // maximum timestamp emitted by the source so far
    uint64_t watermark; // watermark to be used for sending the next output
    uint64_t initial_time_us = 0; // initial time in usec
    doEmit_t doEmit = nullptr; // pointer to the doEmit method of the Emitter

#if defined (WF_TRACING_ENABLED)
    Stats_Record *stats_record = nullptr;
    double avg_ts_us = 0;
    double avg_td_us = 0;
    volatile uint64_t startTD, endTD;
#endif

    // Constructor
    Source_Shipper(Basic_Emitter *_emitter,
                   ff::ff_monode *_node,
                   Execution_Mode_t _execution_mode,
                   Time_Policy_t _time_policy):
                   emitter(_emitter),
                   node(_node),
                   execution_mode(_execution_mode),
                   time_policy(_time_policy),
                   num_delivered(0),
                   max_timestamp(0),
                   watermark(0)
    {
        doEmit = emitter->get_doEmit();
    }

    // Copy Constructor
    Source_Shipper(const Source_Shipper &_other):
                   node(_other.node),
                   execution_mode(_other.execution_mode),
                   time_policy(_other.time_policy),
                   num_delivered(_other.num_delivered),
                   max_timestamp(_other.max_timestamp),
                   watermark(_other.watermark)
    {
        if (_other.emitter != nullptr) {
            emitter = (_other.emitter)->clone();
            doEmit = emitter->get_doEmit();
        }
        else {
            emitter = nullptr;
        }
#if defined (WF_TRACING_ENABLED)
        stats_record = _other.stats_record;
#endif
    }

    // Destructor
    ~Source_Shipper()
    {
        if (emitter != nullptr) {
            delete emitter;
        }
    }

    // Set the initial time in usec
    void setInitialTime(uint64_t _initial_time_us)
    {
        initial_time_us = _initial_time_us;
    }

    // Set the execution and time mode of the Source_Shipper
    void setConfiguration(Execution_Mode_t _execution_mode, Time_Policy_t _time_policy)
    {
        execution_mode = _execution_mode;
        time_policy = _time_policy;
    }

#if defined (WF_TRACING_ENABLED)
    // Set the pointer to the Stats_Record object
    void setStatsRecord(Stats_Record *_stats_record)
    {
        stats_record = _stats_record;
    }
#endif

    // Flushing function of the shipper
    void flush()
    {
        emitter->flush(node); // call the flush of the emitter
    }

public:
    /** 
     *  \brief Get the number of results delivered by the Source_Shipper
     *  
     *  \return number of results
     */ 
    uint64_t getNumDelivered() const
    {
        return num_delivered;
    }

    /** 
     *  \brief Send a new data item in the data-flow graph. Its timestamp is
     *         automatically assigned by the runtime system. It can be used
     *         with INGRESS_TIME policy only
     *  
     *  \param _r result to be delivered (copy semantics)
     */ 
    void push(const result_t &_r)
    {
#if defined (WF_TRACING_ENABLED)
        if (stats_record->outputs_sent == 0) {
            startTD = current_time_nsecs();
        }
#endif
        if (time_policy != Time_Policy_t::INGRESS_TIME) { // this push can be used with INGRESS_TIME policy only
            std::cerr << RED << "WindFlow Error: push(result_t) requires INGRESS_TIME policy" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        uint64_t timestamp = current_time_usecs() - initial_time_us; // calculate the timestamp
        max_timestamp = timestamp; // ingress_time timestamps are always monotonically increasing
        if (execution_mode == Execution_Mode_t::DEFAULT) {
            watermark = timestamp; // watermarks equal to timestamps in case of DEFAULT mode
        }
        result_t copy_result = _r; // copy the result to be delivered
        doEmit(this->emitter, &copy_result, 0, timestamp, watermark, node);
        num_delivered++;
#if defined (WF_TRACING_ENABLED)
        stats_record->outputs_sent++;
        stats_record->bytes_sent += sizeof(result_t);
        endTD = current_time_nsecs();
        double elapsedtime_us = ((double) (endTD - startTD)) / 1000;
        avg_td_us += (1.0 / stats_record->outputs_sent) * (elapsedtime_us - avg_td_us);
        avg_ts_us = avg_td_us;
        stats_record->eff_service_time = std::chrono::duration<double, std::micro>(avg_td_us);
        stats_record->service_time = stats_record->eff_service_time;
        startTD = current_time_nsecs();
#endif
    }

    /** 
     *  \brief Send a new data item in the data-flow graph. Its timestamp is
     *         automatically assigned by the runtime system. It can be used
     *         with INGRESS_TIME policy only
     *  
     *  \param _r result to be delivered (move semantics)
     */ 
    void push(result_t &&_r)
    {
#if defined (WF_TRACING_ENABLED)
        if (stats_record->outputs_sent == 0) {
            startTD = current_time_nsecs();
        }
#endif
        if (time_policy != Time_Policy_t::INGRESS_TIME) { // this push can be used with INGRESS_TIME policy only
            std::cerr << RED << "WindFlow Error: push(result_t) requires INGRESS_TIME policy" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        uint64_t timestamp = current_time_usecs() - initial_time_us; // calculate the timestamp
        max_timestamp = timestamp; // ingress_time timestamps are always monotonically increasing
        if (execution_mode == Execution_Mode_t::DEFAULT) {
            watermark = timestamp; // watermarks equal to timestamps in case of DEFAULT mode
        }
        doEmit(this->emitter, &_r, 0, timestamp, watermark, node);
        num_delivered++;
#if defined (WF_TRACING_ENABLED)
        stats_record->outputs_sent++;
        stats_record->bytes_sent += sizeof(result_t);
        endTD = current_time_nsecs();
        double elapsedtime_us = ((double) (endTD - startTD)) / 1000;
        avg_td_us += (1.0 / stats_record->outputs_sent) * (elapsedtime_us - avg_td_us);
        avg_ts_us = avg_td_us;
        stats_record->eff_service_time = std::chrono::duration<double, std::micro>(avg_td_us);
        stats_record->service_time = stats_record->eff_service_time;
        startTD = current_time_nsecs();
#endif
    }

    /** 
     *  \brief Send a new data item in the data-flow graph. Its timestamp is
     *         user defined. It can be used with EVENT_TIME policy only
     *  
     *  \param _r result to be delivered (copy semantics)
     *  \param _ts timestamp value (in microseconds starting from zero)
     */ 
    void pushWithTimestamp(const result_t &_r, uint64_t _ts)
    {
#if defined (WF_TRACING_ENABLED)
        if (stats_record->outputs_sent == 0) {
            startTD = current_time_nsecs();
        }
#endif
        if (time_policy != Time_Policy_t::EVENT_TIME) { // this pushWithTimestamp can be used with EVENT_TIME policy only
            std::cerr << RED << "WindFlow Error: pushWithTimestamp(result_t, uint64_t) requires EVENT_TIME policy" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        if (_ts >= max_timestamp) { // check if it is the maximum timestamp emitted so far
            max_timestamp = _ts;
        }
        else if (execution_mode == Execution_Mode_t::DETERMINISTIC) { // DETERMINISTIC mode requires monotonic timestamps
            std::cerr << RED << "WindFlow Error: user-defined timestamps must be monotonically increasing in DETERMINISTIC mode" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        result_t copy_result = _r; // copy the result to be delivered
        doEmit(this->emitter, &copy_result, 0, _ts, watermark, node);
        num_delivered++;
#if defined (WF_TRACING_ENABLED)
        stats_record->outputs_sent++;
        stats_record->bytes_sent += sizeof(result_t);
        endTD = current_time_nsecs();
        double elapsedtime_us = ((double) (endTD - startTD)) / 1000;
        avg_td_us += (1.0 / stats_record->outputs_sent) * (elapsedtime_us - avg_td_us);
        avg_ts_us = avg_td_us;
        stats_record->eff_service_time = std::chrono::duration<double, std::micro>(avg_td_us);
        stats_record->service_time = stats_record->eff_service_time;
        startTD = current_time_nsecs();
#endif
    }

    /** 
     *  \brief Send a new data item in the data-flow graph. Its timestamp is
     *         user defined. It can be used with EVENT_TIME policy only
     *  
     *  \param _r result to be delivered (move semantics)
     *  \param _ts timestamp value (in microseconds starting from zero)
     */ 
    void pushWithTimestamp(result_t &&_r, uint64_t _ts)
    {
#if defined (WF_TRACING_ENABLED)
        if (stats_record->outputs_sent == 0) {
            startTD = current_time_nsecs();
        }
#endif
        if (time_policy != Time_Policy_t::EVENT_TIME) { // this pushWithTimestamp can be used with EVENT_TIME policy only
            std::cerr << RED << "WindFlow Error: pushWithTimestamp(result_t, uint64_t) requires EVENT_TIME policy" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        if (_ts >= max_timestamp) { // check if it is the maximum timestamp emitted so far
            max_timestamp = _ts;
        }
        else if (execution_mode == Execution_Mode_t::DETERMINISTIC) { // DETERMINISTIC mode requires monotonic timestamps
            std::cerr << RED << "WindFlow Error: user-defined timestamps must be monotonically increasing in DETERMINISTIC mode" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        doEmit(this->emitter, &_r, 0, _ts, watermark, node);
        num_delivered++;
#if defined (WF_TRACING_ENABLED)
        stats_record->outputs_sent++;
        stats_record->bytes_sent += sizeof(result_t);
        endTD = current_time_nsecs();
        double elapsedtime_us = ((double) (endTD - startTD)) / 1000;
        avg_td_us += (1.0 / stats_record->outputs_sent) * (elapsedtime_us - avg_td_us);
        avg_ts_us = avg_td_us;
        stats_record->eff_service_time = std::chrono::duration<double, std::micro>(avg_td_us);
        stats_record->service_time = stats_record->eff_service_time;
        startTD = current_time_nsecs();
#endif
    }

    /** 
     *  \brief Set the new watermark to be propagated with the next data item.
     *         It can be used with DEFAULT execution mode and EVENT_TIME policy only
     *  
     *  \param _wm new watermark value (in microseconds starting from zero)
     */ 
    void setNextWatermark(uint64_t _wm)
    {
        if (execution_mode != Execution_Mode_t::DEFAULT) { // only DEFAULT mode
            std::cerr << RED << "WindFlow Error: setNextWatermark(uint64_t) requires DEFAULT mode" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        if (time_policy != Time_Policy_t::EVENT_TIME) { // this emitWatermark can be used with EVENT_TIME policy only
            std::cerr << RED << "WindFlow Error: setNextWatermark(uint64_t) requires EVENT_TIME policy" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }        
        if (_wm < watermark) { // check watermarks are monotonically increasing
            std::cerr << RED << "WindFlow Error: watermarks must be monotonically increasing" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        if (_wm > max_timestamp) { // current watermark cannot be greater than the maximum emitted timestamp
            std::cerr << RED << "WindFlow Error: watermark cannot be greater than the highest emitted timestamp" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        watermark = _wm;
    }

    /** 
     *  \brief Emit an explicit punctuaction message conveying a new user-defined
     *         watermark value. It can be used with DEFAULT execution mode and
     *         EVENT_TIME policy only
     *  
     *  \param _wm new watermark value (in microseconds starting from zero)
     */ 
    void emitWatermark(uint64_t _wm)
    {
        if (execution_mode != Execution_Mode_t::DEFAULT) { // only DEFAULT mode
            std::cerr << RED << "WindFlow Error: emitWatermark(uint64_t) requires DEFAULT mode" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        if (time_policy != Time_Policy_t::EVENT_TIME) { // emitWatermark(uint64_t) can be used with EVENT_TIME policy only
            std::cerr << RED << "WindFlow Error: emitWatermark(uint64_t) requires EVENT_TIME policy" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        if (_wm < watermark) { // check watermarks are monotonically increasing
            std::cerr << RED << "WindFlow Error: watermarks must be monotonically increasing" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        if (_wm > max_timestamp) { // current watermark cannot be greater than the maximum emitted timestamp
            std::cerr << RED << "WindFlow Error: watermark cannot be greater than the highest emitted timestamp" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        watermark = _wm;
        emitter->propagate_punctuation(watermark, node);
    }

    /** 
     *  \brief Emit an explicit punctuaction message conveying a new automatic
     *         watermark value computed by the runtime system based on the current
     *         system time. It can be used with DEFAULT execution mode and INGRESS_TIME
     *         policy only
     */ 
    void emitWatermark()
    {
        if (execution_mode != Execution_Mode_t::DEFAULT) { // only DEFAULT mode
            std::cerr << RED << "WindFlow Error: emitWatermark() requires DEFAULT mode" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        if (time_policy != Time_Policy_t::INGRESS_TIME) { // emitWatermark() can be used with INGRESS_TIME policy only
            std::cerr << RED << "WindFlow Error: emitWatermark() requires INGRESS_TIME policy" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        uint64_t wm = current_time_usecs() - initial_time_us; // calculate the current time
        if (wm < watermark) { // check watermarks are monotonically increasing
            std::cerr << RED << "WindFlow Error: watermarks must be monotonically increasing" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        if (wm > max_timestamp) { // current watermark cannot be greater than the maximum emitted timestamp
            std::cerr << RED << "WindFlow Error: watermark cannot be greater than the highest emitted timestamp" << DEFAULT_COLOR << std::endl;
            exit(EXIT_FAILURE);
        }
        watermark = wm;
        emitter->propagate_punctuation(watermark, node);
    }
};

} // namespace wf

#endif
